#Create a Kafka consumer with the correct credentials
# Also need to set deserializer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
client_id="client1",
bootstrap_servers=f"kafka-4da1624-wssitu-05f0.aivencloud.com:19068",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="service.cert",
ssl_keyfile="service.key",
value_deserializer = lambda v: json.loads(v.decode('ascii')),
key_deserializer = lambda v: json.loads(v.decode('ascii')),
max_poll_records = 10
)
# Use topics method to see all topics
consumer.topics()
{'Pizza'}
topic_name = "Pizza"
# subscribe to a list of topics, in this case only "Pizza"
consumer.subscribe(topics=["Pizza"])
# Check what you are subscribed to
consumer.subscription()
{'Pizza'}
# Use for loop to continuously print messages from the Kafka topic
for message in consumer:
print("Partition is ",message.partition, "Offset is ",message.offset, "key is ",message.key, "value is ",message.value)
Partition is 0 Offset is 67 key is {'id': 1} value is {'name': 'John', 'pizza': 'Cheese'} Partition is 0 Offset is 68 key is {'id': 2} value is {'name': 'Steve', 'pizza': 'Pepperoni'} Partition is 0 Offset is 69 key is {'id': 3} value is {'name': 'Kate', 'pizza': 'Steak'} Partition is 0 Offset is 70 key is {'id': 4} value is {'name': 'Jason', 'pizza': 'Chicken'} Partition is 0 Offset is 71 key is {'id': 5} value is {'name': 'Kevin', 'pizza': 'Donaid'}